package com.bw.gmall.realtime.app.dwd;

import com.bw.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DwdInteractionFavorAdd {
    public static void main(String[] args) {
        // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        // TODO  3.从 Kafka 读取业务数据
        tenv.executeSql("create table topic_db (" +
                "`database` string, " +
                "`table` string, " +
                "`type` string, " +
                "`data` map<string,string>, " +
                "`ts` string " +
                ")"+ MyKafkaUtil.getKafkaDDL("topic_db","dwd_interaction_favor_add"));

        // TODO 4. 读取收藏表数据
        Table favorInfo = tenv.sqlQuery("select " +
                "data['id'] id, " +
                "data['user_id'] user_id, " +
                "data['sku_id'] sku_id, " +
                "date_format(data['create_time'],'yyyy-MM-dd') data_id, " +
                "data['create_time'] create_time, " +
                "ts " +
                "from topic_db " +
                "where `table` = 'favor_info' " +
                "and (`type` = 'insert' or (`type` = 'update' and data['is_cancel'] = '0'))");

        tenv.createTemporaryView("favor_info",favorInfo);

        // TODO 5. 创建 Kafka-Connector dwd_interaction_favor_add 表

        tenv.executeSql("create table dwd_interaction_favor_add(" +
                "id string, " +
                "user_id string, " +
                "sku_id string, " +
                "data_id string, " +
                "create_time string, " +
                "ts string " +
                ")"+MyKafkaUtil.getKafkaSinkDDL("dwd_interaction_favor_add"));
        // TODO 6. 将数据写入 Kafka-Connector 表
        tenv.executeSql("insert into dwd_interaction_favor_add select * from favor_info");


    }
}
